## Effect Types

/// Observable[T,E] represents an effect that may return 0/finite/infinite
/// T values, or throw an E error half-way, synchronously or asynchronously.
type Observable [out T = unit, out E = never]
    native; // rx.Observable

/// Async[T,E] represents an effect that may return a T value,
/// or throw an E error, synchronously or asynchronously.
/// It is a subtype of Observable[T,E] and can be regarded as a lazy Promise.
type Async [out T = unit, out E = never]
    protected
    Observable[T,E];

/// Sync[T] represents an effect that returns a T value synchronously.
/// It is a subtype of Async[T]. Different from general Async, an Sync
/// is guaranteed synchronous and cannot be cancelled half-way.
/// There is a `sync` function, which can be used to concatenate
/// multiple Sync effects into a single Sync effect.
type Sync [out T = unit]
    protected
    Async[T];

/// Source[T] represents an effect that subscribes to a discrete data source.
/// It is a subtype of Observable[T]. Different from general Observable, a Source
/// never terminates.
type Source [out T]
    protected
    Observable[T];

/// Computed[T] represents an effect that subscribes to a continuous data source.
/// It is a subtype of Source[T]. Different from general Source, a Computed
/// holds a current value and always produces its current value synchronously
/// as its first value.
type Computed [out T]
    protected
    Source[T];


## Wired Components

/// Sink[T] accepts values of type T.
type Sink [in T]
    native; // rx.Sink

/// Bus[T] accepts values of type T and provides accepted T values,
/// similar to the concept of Subject in ReactiveX.
type Bus[T]
    protected Sink[T]; // rx.Bus

/// Reactive[T] represents a reactively mutable T value,
/// similar to the concept of BehaviourSubject in ReactiveX.
type Reactive[T]
    protected Bus[T]; // rx.Reactive

/// ReactiveEntity[T] is a subtype of Reactive[T],
/// which represents an independent reactive entity holding a mutable T value.
/// That is, different from the general concept of Reactive,
/// a ReactiveEntity is not derived from other Reactive values.
/// As a consequence, a ReactiveEntity can store snapshots of previous values
/// so that it is possible to perform undo/redo operations on it.
type ReactiveEntity[T]
    protected Reactive[T];  // rx.ReactiveEntity


## Synchronization Types

type Mutex [out T]
    native;


## Mutable Containers

type Buffer native;  // rx.Buffer


## Wired Components Operations
export function connect:[T]
    &(Source[T], Sink[T]) => Async[never]
    native 'connect';
export function connect:[T]
    &(Bus[T], Sink[T]) => Async[never]
    &(bus, sink) => ({watch(bus)} connect sink);
export function connect:[T]
    &(Source[T], &(T) => Async) => Async[never]
    &(source, receiver) => (source connect {Callback(receiver)});
export function connect:[T]
    &(Bus[T], &(T) => Async) => Async[never]
    &(bus, receiver) => ({watch(bus)} connect {Callback(receiver)});
export function connect:
    &(Source[unit], Async) => Async[never]
    &(source, receiver) => (source connect {Callback::[unit](&() => receiver)});
export function connect:
    &(Bus[unit], Async) => Async[never]
    &(bus, receiver) => (bus connect {Callback::[unit](&() => receiver)});
export function <- :
    [T] &(Sink[T], T) => Sync
    native 'sink-write';
export function adapt:[A,B]
    &(Sink[A], &(B) => A) => Sink[B]
    native 'sink-adapt';
export function watch:[T]
    &(Bus[T]) => Source[T]
    native 'bus-watch';
export function watch:[T]
    &(Reactive[T]) => Computed[T]
    native 'bus-watch';
export function update:[T]
    &(Reactive[T], &(T) => T) => Sync
    native 'reactive-update';
export function adapt:[A,B]
    &(Reactive[A], &(A) => &(B) => A) => Sink[B]
    native 'reactive-adapt';
function morph:[A,B]
    &(Reactive[A], (&(A) => &(B) => A), (&(A) => B)) => Reactive[B]
    native 'reactive-morph';
export function morph:[A,B]
    &(Reactive[A], { in: &(A) => &(B) => A, out: &(A) => B }) => Reactive[B]
    &(r, f) => { morph (r, f.in, f.out) };
export function proj:[A,B]
    &(Reactive[A], &(A) => ProjRef[A,B]) => Reactive[B]
    &(r, ref) => (r morph {
        in:  (&a => &b => (a.{ref} <- b)) .[&(A) => &(B) => A],
        out: (&a => a.{ref}.{->}) .[&(A) => B]
    });
export function -> :
    [T] &(Reactive[T]) => Sync[T]
    native 'reactive-read';
// TODO: should provide record instead of tuple
export function consume:[T,X]
    &(Reactive[FlexList[T]], &(FlexListKey,Computed[Number],Reactive[T]) => Computed[X]) => Computed[List[X]]
    native 'reactive-flex-consume';
export function Blackhole:
    &() => Sink[any]
    native 'Blackhole';
export function Callback:[T]
    &(&(T) => Async) => Sink[T]
    native 'Callback';
export function Bus:[T]
    &() => Sync[Bus[T]]
    native 'create-bus';
export function Reactive:[T]
    &(T) => Sync[ReactiveEntity[T]]
    native 'create-reactive';

## Mutable Containers Operations
// TODO

## Locks Operations
export function Mutex: [T, K < Observable[any,any]]
    &(T, &(Mutex[T]) => K) => K
    native 'mutex';
export function lock: [T, K < Observable[any]]
    &(Mutex[T], &(T) => K) => K
    native 'mutex-lock';

## Effect Constructors
export function Source:[T]
    &(Observable[T]) => Source[T]
    native 'as-source';
export function Computed:[A,B,X]
    &((Computed[A],Computed[B]), &(A,B) => X) => Computed[X]
    native 'computed';
export function Computed:[A,B,C,X]
    &((Computed[A],Computed[B],Computed[C]), &(A,B,C) => X) => Computed[X]
    native 'computed';
export function Computed:[A,B,C,D,X]
    &((Computed[A],Computed[B],Computed[C],Computed[D]), &(A,B,C,D) => X) => Computed[X]
    native 'computed';
export function with: [A < Observable[any,any]]
    &(A, Async) => A
    native 'with';
export function gen-random:
    &() => Sync[NormalFloat]
    native 'gen-random';
export function gen-monotonic-id:
    &() => Sync[Number]
    native 'gen-monotonic-id';
export function gen-monotonic-id-string:
    &() => Sync[String]
    native 'gen-monotonic-id-string';
export function crash:
    &(String) => Sync[never]
    native 'crash';
export function crash:
    &(Error) => Sync[never]
    native 'crash';
/// go(f) returns an action that evaluates f() in a separate goroutine.
export function go:[T]
    &(&() => T) => Async[T]
    native 'go-thunk';
/// go(seq) returns an action that iterates seq in a separate goroutine.
export function go:[T]
    &(Seq[T]) => Observable[T]
    native 'go-seq';
export const Noop: Sync :=
    { yield () };
export function yield:[T]
    &(T) => Sync[T]
    native 'yield';
export function yield*:[T]
    &(Seq[T]) => Observable[T]
    native 'yield*-seq';
export function yield*:[T]
    &(List[T]) => Observable[T]
    native 'yield*-array';
export function not:
    &(Computed[Bool]) => Computed[Bool]
    &(p) => p.{map(not)};
export function and:
    &(Computed[Bool], Computed[Bool]) => Computed[Bool]
    &(p,q) => { Computed ((p,q), &(p,q) => (p and q)) };
export function or:
    &(Computed[Bool], Computed[Bool]) => Computed[Bool]
    &(p,q) => { Computed ((p,q), &(p,q) => (p or q)) };
export function start-with:[T,E]
    &(Observable[T,E], List[T]) => Observable[T,E]
    native 'start-with';
export function Computed:[T]
    &(Source[T], { beginning: List[T] }) => Computed[T]
    native 'start-with-to-computed';
export function wait:
    & { timeout: Number } => Async
    native 'wait';
export function tick:
    & { interval: Number } => Source[unit]
    native 'tick';

## Effect Operators
export function ->:
    [T] &(Computed[T]) => Sync[T]
    native 'computed-read';
export function take-one-as-single:[T,E]
    &(Observable[T,E]) => Async[Maybe[T],E]
    native 'take-one-as-single';
export function wait-complete:[E]
    &(Observable[unit,E]) => Async[unit,E]
    native 'wait-complete';
export function forever:[E]
    &(Observable[unit,E]) => Async[never,E]
    native 'forever';

/// This function has two kinds of definitions:
/// 1. then(a1, r1 => a2) returns an action that performs a1 first and then
///   transforms r1 (the result of a1) into a2 and performs a2.
/// 2. then(a1, a2) returns an action that performs a1 and then performs a2.
/// In both cases, the returned action returns the result of a2.
/// Moreover, this function has an alias called `await`, which is intended
/// to be used within Flat CPS expressions.
///
export function then:[A,B,E]
    &(Async[A,E], &(A) => Async[B,E]) => Async[B,E]
    native 'then';
export function then:[A,B,E]
    &(Async[A,E], &(A) => Observable[B,E]) => Observable[B,E]
    native 'then';
export function then:[A,B]
    &(Async[A], &(A) => Source[B]) => Source[B]
    native 'then';
export function then:[E,B]
    &(Async[unit,E], Async[B,E]) => Async[B,E]
    native 'then-shortcut';
export function then:[E,B]
    &(Async[unit,E], Observable[B,E]) => Observable[B,E]
    native 'then-shortcut';
export function then:[B]
    &(Async, Source[B]) => Source[B]
    native 'then-shortcut';
alias await := then;

export function sync: [A, K < Observable[any,any]]
    &(Sync[A], &(A) => K) => K
    native 'sync';
export function sync: [K < Observable[any,any]]
    &(Sync, K) => K
    native 'sync-shortcut';

export function catch:[T,E,F]
    &(Async[T,E], &(E) => Async[T,F]) => Async[T,F]
    native 'catch';
export function catch:[T,E,F]
    &(Observable[T,E], &(E) => Observable[T,F]) => Observable[T,F]
    native 'catch';
export function catch-retry:[T,E]
    &(Async[T,E], &(E) => Async[Bool]) => Async[T,E]
    native 'catch-retry';
export function catch-retry:[T,E]
    &(Observable[T,E], &(E) => Async[Bool]) => Observable[T,E]
    native 'catch-retry';
export function catch-throw:[T,E,F]
    &(Async[T,E], &(E) => F) => Async[T,F]
    native 'catch-throw';
export function catch-throw:[T,E,F]
    &(Observable[T,E], &(E) => F) => Observable[T,F]
    native 'catch-throw';
export function crash-on-error:[T]
    &(Async[T,Error]) => Async[T]
    &(action) => action.{ catch &(err) => { crash err } };
export function crash-on-error:[T]
    &(Observable[T,Error]) => Observable[T]
    &(action) => action.{ catch &(err) => { crash err } };
export function throw:[E]
    &(E) => Async[never,E]
    native 'throw';
export function map:[A,B,E]
    &(Async[A,E], &(A) => B) => Async[B,E]
    native 'observable-map';
export function map:[A,B,E]
    &(Observable[A,E], &(A) => B) => Observable[B,E]
    native 'observable-map';
export function map:[A,B]
    &(Source[A], &(A) => B) => Source[B]
    native 'observable-map';
export function map:[A,B]
    &(Computed[A], &(A) => B) => Computed[B]
    native 'computed-map';
export function map-to:[A,B,E]
    &(Async[A,E], B) => Async[B,E]
    &(e,v) => e.{map(&(_) => v)};
export function map-to:[A,B,E]
    &(Observable[A,E], B) => Observable[B,E]
    &(e,v) => e.{map(&(_) => v)};
export function filter:[T,E]
    &(Observable[T,E], &(T) => Bool) => Observable[T,E]
    native 'observable-filter';
export function filter:[T]
    &(Source[T], &(T) => Bool) => Source[T]
    native 'observable-filter';
export function filter-map:[A,B,E]
    &(Observable[A,E], &(A) => Maybe[B]) => Observable[B,E]
    native 'observable-filter-map';
export function reduce:[T,A,E]
    &(Observable[T,E], (A, &(A,T) => A)) => Async[A,E]
    native 'observable-reduce';
export function scan:[T,A,E]
    &(Observable[T,E], (A, &(A,T) => A)) => Observable[A,E]
    native 'observable-scan';
export function scan:[T,A]
    &(Source[T], (A, &(A,T) => A)) => Source[A]
    native 'observable-scan';
export function debounce:[T,E]
    &(Observable[T,E], Number) => Observable[T,E]
    native 'debounce-time';
export function debounce:[T]
    &(Source[T], Number) => Source[T]
    native 'debounce-time';
export function merge-map:[A,B,E]
    &(Observable[A,E], &(A) => Observable[B,E]) => Observable[B,E]
    native 'merge-map';
export function concat-map:[A,B,E]
    &(Observable[A,E], &(A) => Observable[B,E]) => Observable[B,E]
    native 'concat-map';
export function concat-map:[A,B]
    &(Source[A], &(A) => Observable[B]) => Source[B]
    native 'concat-map';
export function mix-map:[A,B,E]
    &(Observable[A,E], Number, &(A) => Observable[B,E]) => Observable[B,E]
    native 'mix-map';
export function switch-map:[A,B,E]
    &(Observable[A,E], &(A) => Observable[B,E]) => Observable[B,E]
    native 'switch-map';
export function switch-map:[A,B]
    &(Source[A], &(A) => Observable[B]) => Source[B]
    native 'switch-map';
export function switch-map:[A,B]
    &(Computed[A], &(A) => Computed[B]) => Computed[B]
    native 'switch-map-computed';
export function merge:[T,E]
    &(List[Observable[T,E]]) => Observable[T,E]
    native 'observable-merge';
export function merge:
    &(List[Async[never]]) => Async[never]
    native 'observable-merge';
export function merge:[T]
    &(List[Source[T]]) => Source[T]
    native 'observable-merge';
export function concat:[T,E]
    &(List[Observable[T,E]]) => Observable[T,E]
    native 'observable-concat';
export function distinct-until-changed:
    [T,E] { = : Op=[T] }
    &(Observable[T,E]) => Observable[T,E]
    &(a) => { distinct-until-changed (a, =) };
export function distinct-until-changed:
    [T] { = : Op=[T] }
    &(Source[T]) => Source[T]
    &(a) => { distinct-until-changed (a, =) };
export function distinct-until-changed:[T,E]
    &(Observable[T,E], &(T,T) => Bool) => Observable[T,E]
    native 'distinct-until-changed';
export function distinct-until-changed:[T]
    &(Source[T], &(T,T) => Bool) => Source[T]
    native 'distinct-until-changed';
export function with-latest-from:[A,B,E]
    &(Observable[A,E], Observable[B,E]) => Observable[(A,Maybe[B]),E]
    native 'with-latest-from';
export function with-latest-from:[A,B,E]
    &(Observable[A,E], Reactive[B]) => Observable[(A,B),E]
    native 'with-latest-from-reactive';
export function with-latest-from:[A,B]
    &(Source[A], Reactive[B]) => Source[(A,B)]
    native 'with-latest-from-reactive';
export function with-latest-from:[A,B]
    &(Computed[A], Reactive[B]) => Computed[(A,B)]
    native 'with-latest-from-reactive-to-computed';
export function combine-latest:[A,B,E]
    &(Observable[A,E], Observable[B,E]) =>
        Observable[(Maybe[A],Maybe[B]),E]
    native 'combine-latest';
export function combine-latest:[A,B,C,E]
    &(Observable[A,E],Observable[B,E],Observable[C,E]) =>
        Observable[(Maybe[A],Maybe[B],Maybe[C]),E]
    native 'combine-latest';
export function combine-latest:[A,B,C,D,E]
    &(Observable[A,E],Observable[B,E],Observable[C,E],Observable[D,E]) =>
        Observable[(Maybe[A],Maybe[B],Maybe[C],Maybe[D]),E]
    native 'combine-latest';
export function combine-latest*:[A,B,E]
    &(Observable[A,E],Observable[B,E]) => Observable[(A,B),E]
    native 'combine-latest*';
export function combine-latest*:[A,B,C,E]
    &(Observable[A,E],Observable[B,E],Observable[C,E]) => Observable[(A,B,C),E]
    native 'combine-latest*';
export function combine-latest*:[A,B,C,D,E]
    &(Observable[A,E],Observable[B,E],Observable[C,E],Observable[D,E]) => Observable[(A,B,C,D),E]
    native 'combine-latest*';
export function combine-latest*:[T,E]
    &(List[Observable[T,E]]) => Observable[List[T],E]
    native 'combine-latest*-array';
export function combine:[A,B]
    &(Computed[A],Computed[B]) => Computed[(A,B)]
    native 'combine';
export function combine:[A,B,C]
    &(Computed[A],Computed[B],Computed[C]) => Computed[(A,B,C)]
    native 'combine';
export function combine:[A,B,C,D]
    &(Computed[A],Computed[B],Computed[C],Computed[D]) => Computed[(A,B,C,D)]
    native 'combine';
export function combine:[T]
    &(List[Computed[T]]) => Computed[List[T]]
    native 'combine-array';
